Skip to content

Upgrade Elasticsearch from 7.x to 8.18.0 #6640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env-dist
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ CELERY_TASK_ALWAYS_EAGER=False
CSRF_COOKIE_SECURE=False
DATABASE_URL=postgres://kitsune:kitsune@postgres:5432/kitsune
DATABASE_READ_ONLY_URL=postgres://kitsune:kitsune@postgres:5432/kitsune
ES_URLS=elasticsearch:9200
ES_URLS=http://elasticsearch:9200
ES_DEFAULT_SQL_CHUNK_SIZE=1000
ES_DEFAULT_ELASTIC_CHUNK_SIZE=100
ES_RETRY_ON_TIMEOUT=True
ES_VERIFY_CERTS=False
ES_SSL_SHOW_WARN=False
ES_SNIFF_ON_START=False
ES_SNIFF_ON_CONNECTION_FAIL=False
ES_TEST_MAX_RETRIES=5
ES_TEST_TIMEOUT_MULTIPLIER=3
SESSION_COOKIE_SECURE=False
SECRET_KEY=secret
DEBUG=True
Expand Down
1 change: 1 addition & 0 deletions .env-test
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ REUSE_DB=0
ENABLE_ADMIN=True
SET_LOCALE_PATH=False
SECURE_SSL_REDIRECT=False
ES_URLS=http://elasticsearch:9200
5 changes: 4 additions & 1 deletion kitsune/community/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from django.core.cache import cache
from django.db.models import Count, F, Q
from django.db.models.functions import Now
from elasticsearch_dsl import A

if settings.ES_VERSION == 8:
from elasticsearch8.dsl import A
else:
from elasticsearch_dsl import A
from kitsune.products.models import Product
from kitsune.search.documents import AnswerDocument, ProfileDocument
from kitsune.users.models import ContributionAreas, User
Expand Down
15 changes: 12 additions & 3 deletions kitsune/questions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@
from django.urls import is_valid_path
from django.utils import translation
from django.utils.translation import pgettext
from elasticsearch import ElasticsearchException

# Use correct Elasticsearch version
if settings.ES_VERSION == 7:
from elasticsearch7 import ElasticsearchException

ESExceptionClass = ElasticsearchException
else:
from elasticsearch8 import ApiError

ESExceptionClass = ApiError
from product_details import product_details

from kitsune.flagit.models import FlaggedObject
Expand Down Expand Up @@ -595,7 +604,7 @@ def related_documents(self):
for hit in search[:3].execute().hits
]
cache.set(key, documents, settings.CACHE_LONG_TIMEOUT)
except ElasticsearchException:
except ESExceptionClass:
log.exception("ES MLT related_documents")
documents = []

Expand Down Expand Up @@ -641,7 +650,7 @@ def related_questions(self):
for hit in search[:3].execute().hits
]
cache.set(key, questions, settings.CACHE_LONG_TIMEOUT)
except ElasticsearchException:
except ESExceptionClass:
log.exception("ES MLT related_questions")
questions = []

Expand Down
53 changes: 39 additions & 14 deletions kitsune/search/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@
from dataclasses import dataclass
from dataclasses import field as dfield
from datetime import datetime
from typing import Self, Union, overload
from typing import Self, Union, overload, cast

from django.conf import settings
from django.core.paginator import EmptyPage, PageNotAnInteger
from django.core.paginator import Paginator as DjPaginator
from django.utils import timezone
from django.utils.translation import gettext as _
from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch_dsl import Document as DSLDocument
from elasticsearch_dsl import InnerDoc, MetaField
from elasticsearch_dsl import Search as DSLSearch
from elasticsearch_dsl import field
from elasticsearch_dsl.utils import AttrDict

if settings.ES_VERSION == 8:
from elasticsearch8.dsl import Document as DSLDocument
from elasticsearch8.dsl import InnerDoc, MetaField
from elasticsearch8.dsl import Search as DSLSearch
from elasticsearch8.dsl import field
from elasticsearch8.dsl.utils import AttrDict
else:
from elasticsearch_dsl import Document as DSLDocument
from elasticsearch_dsl import InnerDoc, MetaField
from elasticsearch_dsl import Search as DSLSearch
from elasticsearch_dsl import field
from elasticsearch_dsl.utils import AttrDict
from pyparsing import ParseException

from kitsune.search.config import (
Expand Down Expand Up @@ -70,7 +78,7 @@ def __init_subclass__(cls, **kwargs):
@classmethod
def search(cls, **kwargs):
"""
Create an `elasticsearch_dsl.Search` instance that will search over this `Document`.
Create an `elasticsearch.dsl.Search` instance that will search over this `Document`.

If no `index` kwarg is supplied, use the Document's Index's `read_alias`.
"""
Expand All @@ -95,11 +103,12 @@ def migrate_reads(cls):
def _update_alias(cls, alias, new_index):
client = es_client()
old_index = cls.alias_points_at(alias)

if not old_index:
client.indices.put_alias(new_index, alias)
client.indices.put_alias(index=new_index, name=alias)
else:
client.indices.update_aliases(
{
body={
"actions": [
{"remove": {"index": old_index, "alias": alias}},
{"add": {"index": new_index, "alias": alias}},
Expand Down Expand Up @@ -202,8 +211,12 @@ def to_action(self, action=None, is_bulk=False, **kwargs):

# If we are in a test environment, mark refresh=True so that
# documents will be updated/added directly in the index.
# For ES8 we need to use the string value "true" instead of boolean
if settings.TEST and not is_bulk:
kwargs.update({"refresh": True})
if settings.ES_VERSION >= 8:
kwargs.update({"refresh": "true"})
else:
kwargs.update({"refresh": True})

if not action or action == "index":
return payload if is_bulk else self.save(**kwargs)
Expand All @@ -228,10 +241,12 @@ def to_action(self, action=None, is_bulk=False, **kwargs):
elif action == "delete":
# if we have a bulk operation, drop the _source and mark the operation as deletion
if is_bulk:
if "_source" in payload:
del payload["_source"]
payload.update({"_op_type": "delete"})
del payload["_source"]
return payload
# This is a single document op, delete it
# ES8 requires specific error handling for common delete errors
kwargs.update({"ignore": [400, 404]})
return self.delete(**kwargs)

Expand Down Expand Up @@ -316,7 +331,8 @@ class inherits, relevant to the documents the child class is searching over.
"""

total: int = dfield(default=0, init=False)
hits: list[AttrDict] = dfield(default_factory=list, init=False)
# Use a single declaration with a type that works for both ES versions
hits: Union[list[AttrDict], AttrDict] = dfield(default_factory=list, init=False)
results: list[dict] = dfield(default_factory=list, init=False)
last_key: Union[int, slice, None] = dfield(default=None, init=False)

Expand Down Expand Up @@ -390,10 +406,19 @@ def run(self, key: Union[int, slice] = slice(0, settings.SEARCH_RESULTS_PER_PAGE
return self.run(key)
raise e

self.hits = result.hits
if settings.ES_VERSION >= 8:
self.hits = cast(AttrDict, result.hits)
else:
self.hits = result.hits
self.last_key = key

self.total = self.hits.total.value # type: ignore
# Handle total hits according to ES8 response format
# In ES8, total is always returned as an object with a 'value' property
if settings.ES_VERSION >= 8:
self.total = getattr(self.hits.total, "value", 0)
if isinstance(self.hits.total, dict):
self.total = self.hits.total.get("value", 0)

self.results = [self.make_result(hit) for hit in self.hits]

return self
Expand Down
7 changes: 6 additions & 1 deletion kitsune/search/documents.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from django.db.models import Count, Prefetch, Q
from elasticsearch_dsl import InnerDoc, connections, field
from django.conf import settings

if settings.ES_VERSION == 8:
from elasticsearch8.dsl import InnerDoc, connections, field
else:
from elasticsearch_dsl import InnerDoc, connections, field

from kitsune.forums.models import Post
from kitsune.questions.models import Answer, Question
Expand Down
112 changes: 88 additions & 24 deletions kitsune/search/es_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk as es_bulk
from elasticsearch.helpers.errors import BulkIndexError
from elasticsearch_dsl import Document, UpdateByQuery, analyzer, char_filter, token_filter

if settings.ES_VERSION == 8:
from elasticsearch8.dsl import Document, UpdateByQuery, analyzer, char_filter, token_filter
else:
from elasticsearch_dsl import Document, UpdateByQuery, analyzer, char_filter, token_filter

from kitsune.search import config

Expand Down Expand Up @@ -44,7 +48,6 @@ def _create_synonym_graph_filter(synonym_file_name):
filter_name,
type="synonym_graph",
synonyms_path=f"synonyms/{synonym_file_name}.txt",
# we must use "true" instead of True to work around an elastic-dsl bug
expand="true",
lenient="true",
updateable="true",
Expand Down Expand Up @@ -94,9 +97,30 @@ def es_client(**kwargs):
"""Return an ES Elasticsearch client"""
# prefer a cloud_id if available
if es_cloud_id := settings.ES_CLOUD_ID:
kwargs.update({"cloud_id": es_cloud_id, "http_auth": settings.ES_HTTP_AUTH})
kwargs.update({"cloud_id": es_cloud_id, "basic_auth": settings.ES_HTTP_AUTH})
else:
kwargs.update({"hosts": settings.ES_URLS})
# Basic ES settings that apply to all versions
es_settings = {
"hosts": settings.ES_URLS,
}

# Add settings that are specific to ES 8+
if getattr(settings, "ES_VERSION", 0) >= 8:
es_settings.update(
{
"request_timeout": settings.ES_TIMEOUT,
"retry_on_timeout": settings.ES_RETRY_ON_TIMEOUT,
# SSL settings - these are needed for ES8 which requires SSL by default
"verify_certs": settings.ES_VERIFY_CERTS,
"ssl_show_warn": settings.ES_SSL_SHOW_WARN,
}
)

if settings.ES_HTTP_AUTH:
es_settings.update({"basic_auth": settings.ES_HTTP_AUTH})

kwargs.update(es_settings)

return Elasticsearch(**kwargs)


Expand Down Expand Up @@ -134,10 +158,18 @@ def index_object(doc_type_name, obj_id):
# just return
return

kwargs = {}
# For ES8, use string "true" instead of boolean True for refresh parameter
if settings.TEST:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have this check above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three checks for settings.TEST in the file. The es_client has one to modify timeout and retry as part of local testing, index_object() and delete_object() both test to set the refresh to true. It didn't seem reasonable to pull that check into a helper function just for the second two tests. I don't think we want refresh set to true generally, as it forces an immediate refresh which would be expensive in prod.
I think we could remove the code I added for troubleshooting/resolving tests (so the es_client() timeout, retries and retry_on_timeout) and that would get rid of the test in es_client(). But we want the test in the other two places for sure.

if settings.ES_VERSION >= 8:
kwargs["refresh"] = "true"
else:
kwargs["refresh"] = True

if doc_type.update_document:
doc_type.prepare(obj).to_action("update", doc_as_upsert=True)
doc_type.prepare(obj).to_action("update", doc_as_upsert=True, **kwargs)
else:
doc_type.prepare(obj).to_action("index")
doc_type.prepare(obj).to_action("index", **kwargs)


@shared_task
Expand Down Expand Up @@ -168,15 +200,17 @@ def index_objects_bulk(
# before raising an exception:
_, errors = es_bulk(
es_client(
timeout=timeout,
request_timeout=timeout,
retry_on_timeout=True,
initial_backoff=timeout,
max_retries=settings.ES_BULK_MAX_RETRIES,
),
(doc.to_action(action=action, is_bulk=True, **kwargs) for doc in docs),
chunk_size=elastic_chunk_size,
raise_on_error=False, # we'll raise the errors ourselves, so all the chunks get sent
refresh=True if settings.TEST else False, # update docs immediately when testing
refresh=(
"true"
if settings.TEST and settings.ES_VERSION >= 8
else (True if settings.TEST else False)
), # refresh parameter based on test mode and ES version
)
errors = [
error
Expand All @@ -189,29 +223,50 @@ def index_objects_bulk(

@shared_task
def remove_from_field(doc_type_name, field_name, field_value):
"""Remove a value from all documents in the doc_type's index."""
"""
Given a document type name, a field name, and a value, looks up all
documents containing that value in the specified field and removes
the value from the field (if it's a list field).
"""
doc_type = next(cls for cls in get_doc_types() if cls.__name__ == doc_type_name)

script = (
f"if (ctx._source.{field_name}.contains(params.value)) {{"
f"ctx._source.{field_name}.remove(ctx._source.{field_name}.indexOf(params.value))"
f"}}"
)
# Create script as a string
if getattr(settings, "ES_VERSION", 0) >= 8:
script_source = (
f"if (ctx._source.{field_name} != null) {{ "
f"ctx._source.{field_name}.removeAll(Collections.singleton(params.value)); "
f"}}"
)
else:
script_source = (
f"if (ctx._source.{field_name}.contains(params.value)) {{"
f"ctx._source.{field_name}.remove(ctx._source.{field_name}.indexOf(params.value))"
f"}}"
)

# Set up the update query
update = UpdateByQuery(using=es_client(), index=doc_type._index._name)
update = update.filter("term", **{field_name: field_value})
update = update.script(source=script, params={"value": field_value}, conflicts="proceed")

# Apply the script with parameters
if getattr(settings, "ES_VERSION", 0) >= 8:
update = update.script(
source=script_source, lang="painless", params={"value": field_value}
)
else:
# For ES7 and below, we need to filter documents explicitly
update = update.filter("term", **{field_name: field_value})
update = update.script(
source=script_source,
lang="painless",
params={"value": field_value},
conflicts="proceed",
)

# refresh index to ensure search fetches all matches
doc_type._index.refresh()

update.execute()

# If we are in a test environment, refresh so that
# documents will be updated/added directly in the index.
if settings.TEST:
doc_type._index.refresh()


@shared_task
def delete_object(doc_type_name, obj_id):
Expand All @@ -220,4 +275,13 @@ def delete_object(doc_type_name, obj_id):
doc_type = next(cls for cls in get_doc_types() if cls.__name__ == doc_type_name)
doc = doc_type()
doc.meta.id = obj_id
doc.to_action("delete")

kwargs = {}
# For ES8, use string "true" instead of boolean True for refresh parameter
if settings.TEST:
if settings.ES_VERSION >= 8:
kwargs["refresh"] = "true"
else:
kwargs["refresh"] = True

doc.to_action("delete", **kwargs)
12 changes: 9 additions & 3 deletions kitsune/search/fields.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from functools import partial

from django.conf import settings
from elasticsearch_dsl.field import Keyword
from elasticsearch_dsl.field import Object as DSLObject
from elasticsearch_dsl.field import Text

if settings.ES_VERSION == 8:
from elasticsearch8.dsl.field import Keyword
from elasticsearch8.dsl.field import Object as DSLObject
from elasticsearch8.dsl.field import Text
else:
from elasticsearch_dsl.field import Keyword
from elasticsearch_dsl.field import Object as DSLObject
from elasticsearch_dsl.field import Text

from kitsune.search.es_utils import es_analyzer_for_locale

Expand Down
1 change: 1 addition & 0 deletions kitsune/search/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# This file makes the directory a Python package.
Loading